package com.clp.protocol.iec104.client;

import com.clp.protocol.core.client.ConnectionAlreadyExistException;
import com.clp.protocol.core.event.ListenableContainerEvent;
import com.clp.protocol.iec104.client.event.MasterConnectEvent;
import com.clp.protocol.iec104.client.config.MasterConfig;
import com.clp.protocol.iec104.client.ui.MonitorPanel;
import com.clp.protocol.iec104.client.async.*;
import com.clp.protocol.core.event.EventListener;
import com.clp.protocol.core.event.EventListenerRegister;
import com.clp.protocol.core.client.NettyClient;
import com.clp.protocol.core.client.FailedToConnectException;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * 主站管理器，用于连接和管理主站。单例模式
 */
@Slf4j
public class Iec104MasterManager implements Closeable {
    private static volatile Iec104MasterManager masterManager;

    /**
     * 双检锁
     * @return
     */
    public static Iec104MasterManager get() {
        // 懒加载
        if (masterManager != null && !masterManager.isClosed()) {
            return masterManager;
        }
        synchronized (Iec104MasterManager.class) {
            if (masterManager != null && !masterManager.isClosed()) {
                return masterManager;
            }
            masterManager = new Iec104MasterManager();
        }
        return masterManager;
    }

    static {
        // 确保资源正常关闭
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            synchronized (Iec104MasterManager.class) {
                if (masterManager == null || masterManager.isClosed()) return;
                try {
                    masterManager.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }));
    }

    private final NettyClient nettyClient = new NettyClient(); // 内嵌netty客户端，可确保所有主站使用的都是同一个线程组
    private final MasterContainer container; // 主站容器
    private volatile MonitorPanel monitorPanel; // 监视面板

    private Iec104MasterManager() {
        this.container = new MasterContainer(nettyClient.scheduledExecutorService());
    }

    public void addEventListener(EventListener<ListenableContainerEvent<Master>> listener) {
        container.addContainerEventListener(listener);
    }

    @Nullable
    public Master getMaster(String remoteHost, int remotePort) {
        return container.getOne(remoteHost, remotePort);
    }

    public boolean hasMaster(String remoteHost, int remotePort) {
        return container.contains(remoteHost, remotePort);
    }

    public void forEachMaster(Consumer<Master> consumer) {
        container.forEach(consumer);
    }

    /**
     * 创建 Panel 监视面板
     *
     * @return
     */
    public MonitorPanel getMonitorPanel() {
        if (monitorPanel != null) return monitorPanel;
        synchronized (this) {
            if (monitorPanel != null) return monitorPanel;
            // 这边暂时就用netty的线程
            monitorPanel = new MonitorPanel(this, nettyClient.scheduledExecutorService());
        }
        return monitorPanel;
    }

    private InMaster createMaster(MasterConfig cfg) {
        InMaster master = new InMaster(nettyClient, cfg);
        EventListenerRegister<Master> register = master.getEventListenerRegister();
        register.addEventListener(new EventListener<MasterConnectEvent>() {
            @Override
            public void onEvent(MasterConnectEvent event) {
                MasterConnectEvent.Type eventType = event.getType();
                if (eventType == MasterConnectEvent.Type.AUTO_DISCONNECT_BEFORE) {
                    container.remove(master); // 在自动断开连接之前需要将其从容器中删除
                }
            }
        });
        return master;
    }

    /**
     * 根据指定的配置进行连接
     *
     * @param cfg 主站连接配置
     * @return 对应的主站对象
     */
    public synchronized MasterFuture<Void> connect(MasterConfig cfg) throws Throwable {
        checkNotClosed();
        // 如果检查配置失败，则连接失败
        cfg.check();
        // 检查 从站ip 是否已经被使用
        if (hasMaster(cfg.getRemoteHost(), cfg.getRemotePort())) {
            throw new ConnectionAlreadyExistException(cfg.getRemoteHost(), cfg.getRemotePort());
        }
        InMaster master = createMaster(cfg);
        MasterPromise<Void> retPromise = master.newPromise((Void) null);
        master.getConnector().connect().addListener(new MasterFutureListener<Void>() {
            @Override
            public void operationComplete(MasterFuture<Void> future) {
                if (future.isSuccess()) {
                    container.add(master);
                    retPromise.setSuccess();
                    return;
                }
                retPromise.setFailure(future.cause());
            }
        });
        return retPromise;
    }

    /**
     * 重连
     *
     * @param remoteHost 远程ip
     * @param remotePort 远程端口号
     * @return 异步结果
     */
    public synchronized MasterFuture<Void> reconnect(String remoteHost, int remotePort) {
        return reconnect(container.getOne(remoteHost, remotePort));
    }

    /**
     * 主动重连某个主站
     *
     * @param master
     * @return
     */
    public synchronized MasterFuture<Void> reconnect(Master master) {
        checkNotClosed();
        if (master == null) {
            log.error("重连失败，该主站不存在！");
            return new FailedMasterPromise(null, new FailedToConnectException("主站不存在"));
        }
        return ((InMaster) master).getConnector().reconnect();
    }

    public synchronized CompletableFuture<Boolean> disconnectAll() {
        checkNotClosed();

        if (isClosed() || container.isEmpty()) {
            return CompletableFuture.completedFuture(true);
        }

        List<MasterFuture<Void>> futures = new ArrayList<>();
        for (Master master : container) {
            futures.add(disconnect(master));
        }

        CompletableFuture<Boolean> cf = new CompletableFuture<>();
        final AtomicInteger count = new AtomicInteger(0);
        futures.forEach(future -> {
            future.addListener(new MasterFutureListener<Void>() {
                @Override
                public void operationComplete(MasterFuture<Void> future) {
                    if (future.isSuccess()) {
                        if (count.incrementAndGet() == futures.size()) {
                            cf.complete(true);
                        }
                    }
                }
            });
        });
        return cf;
    }

    /**
     * 关闭指定的主站，如果自动重连失败也会调用这个方法进行关闭（虽然已经关闭了）
     *
     * @param master 要关闭的主站
     * @return
     */
    public synchronized MasterFuture<Void> disconnect(@Nullable Master master) {
        checkNotClosed();
        if (master == null) {
            return new FailedMasterPromise(master, new RuntimeException("主站不存在！"));
        }
        if (isClosed()) {
            return new FailedMasterPromise(master, new RuntimeException("客户端已关闭！"));
        }

        container.remove(master);
        return ((InMaster) master).getConnector().disconnect();
    }

    /**
     * 客户端是否关闭
     *
     * @return
     */
    private boolean isClosed() {
        return nettyClient.isClosed();
    }

    private void checkNotClosed() {
        if (isClosed()) {
            throw new RuntimeException("客户端已关闭！");
        }
    }

    @Override
    public void close() throws IOException {
        if (isClosed()) return;
        try {
            disconnectAll().get();
            nettyClient.close();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}
